Hey Arvid,

Just a quick comment to Arvid's mail for now. It should be safe to
update the Avro version even if we've been declaring dependency on Avro
1.8.2 by default. Moreover up until now we do not bundle any version of
Avro in any of the uber jars we ship. It is true we used Avro version
1.8.2 by default because that's the version that hadoop ships with (the
hadoop distributions really bundle avro dependency as part of their
binaries).

As for the other issue, because Hadoop is no longer the most frequent
environment Flink is run and as you said they are not the fastest with
upgrading dependencies we decided to upgrade the default Avro version
that Flink declares. From Flink 1.12 by default we depend on Avro 1.10
It has already been merged into master[1]. Still users should be able to
downgrade the avro version if they need. (If they have specific records
generated with older versions or they use hadoop.)

@Lian Will look further into the issue. My suspicion though is there is
a problem with the Conversions your generated class declares. In order
for Flink to handle logical types correctly the generated Avro class
must return valid Conversions via SpecificRecord#getConversions(). Could
you share the avro schema and the generated class? Without the full
picture it will be hard to track down the problem. Again best would be
an example that I could run.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-18192

On 21/09/2020 08:04, Arvid Heise wrote:
> Hi Lian,
>
> we had a similar discussion on [1].
>
> TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2]
> until Hive bumps it [3]. In the thread, I gave some options to avoid
> running into the issue.
> The easiest fix is to use Avro 1.8.2 all the way, but you may run into
> [4] if your logical type is nullable (which is not necessary in most
> cases).
>
> Still, I think it's time for us to revise the decision to wait for
> Hive to bump and rather upgrade independently. Avro was for a long
> time stuck on 1.8 but the project gained traction again in the past
> two years. On the other hand, Hive seems to be rather slow to respond
> to that and we shouldn't have a slow moving component block us to
> support a fast moving component if it's such apparent that users want it.
> @Aljoscha Krettek <mailto:aljos...@apache.org> could you please pick
> that topic up and ping the respective maintainers?
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
> [2] https://issues.apache.org/jira/browse/FLINK-12532
> [3] https://issues.apache.org/jira/browse/HIVE-21737
> [4] https://issues.apache.org/jira/browse/AVRO-1891
>
> On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <jiangok2...@gmail.com
> <mailto:jiangok2...@gmail.com>> wrote:
>
>     Thanks Dawid for proposing ConfluentRegistryDeserializationSchema.
>     I am trying ConfluentRegistryAvroDeserializationSchema (if this is
>     what you mean)but got "java.lang.Long cannot be cast to
>     java.time.Instant". This may be caused by
>     https://issues.apache.org/jira/browse/FLINK-11030.
>     <https://issues.apache.org/jira/browse/FLINK-11030> Is there any
>     progress for this JIRA? Thanks. Regards!
>
>
>     Stacktrace:
>     java.lang.ClassCastException: java.lang.Long cannot be cast to
>     java.time.Instant
>     at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
>     at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
>     at
>     
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
>     at
>     
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>     at
>     
> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>     at
>     
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>     at
>     
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>     at
>     
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
>     at
>     
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
>     at
>     
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
>     at
>     
> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
>     at
>     
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
>     at
>     
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>     at
>     
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>     at
>     
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
>     at
>     
> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
>     at
>     
> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
>     at
>     
> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
>     at
>     
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
>     at
>     
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>     at
>     
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>     at
>     
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>     at
>     
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>     at
>     
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
>
>     Code:
>
>     import org.apache.avro.specific.SpecificRecord;
>     import org.apache.flink.api.common.typeinfo.TypeInformation;
>     import
>     
> org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
>     import
>     
> org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
>     import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
>     import
>     org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
>     import
>     org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
>     import org.apache.kafka.clients.consumer.ConsumerRecord;
>     import org.apache.kafka.clients.producer.ProducerRecord;
>
>     import javax.annotation.Nullable;
>     import java.io.Serializable;
>
>     public class SpecificRecordSerDe<T extends SpecificRecord> implements 
> KafkaSerializationSchema<T>, KafkaContextAware<T>, 
> KafkaDeserializationSchema<T>, Serializable {
>
>         private final Class<T> tClass;
>         private String topic; // for serializer private String subject; // 
> for serializer private final String schemaRegistryUrl;
>         private ConfluentRegistryAvroSerializationSchema<T> serializer;
>         private ConfluentRegistryAvroDeserializationSchema<T> deserializer;
>
>         private static final Object lock = new Object();
>
>         public static <T> SpecificRecordSerDe forDeserializer(final Class<T> 
> tClass, String schemaRegistryUrl) {
>             return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
>         }
>
>         public static <T> SpecificRecordSerDe forSerializer(final Class<T> 
> tClass,
>                                                             String 
> schemaRegistryUrl,
>                                                             final String 
> topic,
>                                                             final String 
> subject) {
>             return new SpecificRecordSerDe(tClass, schemaRegistryUrl, topic, 
> subject);
>         }
>
>         private SpecificRecordSerDe(final Class<T> tClass, String 
> schemaRegistryUrl) {
>             this.tClass = tClass;
>             this.schemaRegistryUrl = schemaRegistryUrl;
>         }
>
>         private SpecificRecordSerDe(final Class<T> tClass,
>                                     final String schemaRegistryUrl,
>                                     final String topic,
>                                     final String subject) {
>             this(tClass, schemaRegistryUrl);
>             this.topic = topic;
>             this.subject = subject;
>         }
>     @Override public ProducerRecord<byte[], byte[]> serialize(T element, 
> @Nullable Long timestamp) {
>             if (this.serializer == null) {
>                 synchronized (lock) {
>                     if (this.serializer == null) {
>                         this.serializer = 
> ConfluentRegistryAvroSerializationSchema .forSpecific(tClass, this.subject, 
> this.schemaRegistryUrl);
>                     }
>                 }
>             }
>
>             byte[] bytes = this.serializer.serialize(element);
>             return new ProducerRecord<>(this.topic, bytes);
>         }
>     public boolean isEndOfStream(T nextElement) {
>             return false;
>         }
>
>         @Override public T deserialize(ConsumerRecord<byte[], byte[]> record) 
> throws Exception {
>             if (deserializer == null) {
>                 synchronized (lock) {
>                     if (deserializer == null) {
>                         deserializer = 
> ConfluentRegistryAvroDeserializationSchema .forSpecific(tClass, 
> this.schemaRegistryUrl);
>                     }
>                 }
>             }
>
>             return deserializer.deserialize(record.value());
>         }
>     @Override public String getTargetTopic(T element) {
>             return this.topic;
>         }
>     @Override public TypeInformation<T> getProducedType() {
>             return TypeInformation.of(tClass);
>         }
>     }
>
>
>
>
>     On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz
>     <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:
>
>         Thanks for the update.
>
>         First of all, why did you decide to build your own
>         DeserializationSchema instead of using
>         ConfluentRegistryDeserializationSchema? Your implementation is
>         quite inefficient you do deserialize > serialize >
>         deserialize. Serialization/deserialization is usually one of
>         the heaviest operations in the pipeline.
>
>         What do you return in your getProducedType? From the stack
>         trace I guess you are instantiating the AvroTypeInfo? Could
>         you maybe share a full runnable example? It would make it much
>         easier to help you.
>
>         Moreover the pattern with registering custom conversions in a
>         SpecificData will not work with AvroSerializer. Custom
>         serializers should be defined in the generated SpecificRecord
>         (in your case PayloadRecord) in the
>         SpecificRecordBase#getConversion().
>
>         Best,
>
>         Dawid
>
>
>         On 17/09/2020 16:34, Lian Jiang wrote:
>>         Piotr/Dawid,
>>
>>         Thanks for the reply. FLINK-18223 seems not to related to
>>         this issue and I double checked that I am using Flink 1.11.0
>>         instead of 1.10.0. My mistake.
>>         StreamExecutionEnvironment#getConfig()#enableObjectReuse())
>>         solved the issue.
>>
>>         I am not using ConfluentRegistryDeserializationSchema.
>>         Instead, I am creating custom DeserializationSchema:
>>
>>
>>         /*
>>         the deser class
>>         */
>>         public class SpecificRecordSerDe<T extends SpecificRecord>
>>         implements
>>         KafkaSerializationSchema<T>, KafkaContextAware<T>,
>>         KafkaDeserializationSchema<T>, Serializable { private final
>>         Class<T> tClass; private final String tSchemaStr; private
>>         volatile transient Schema tSchema; private String topic;
>>         private String schemaRegistryUrl; private KafkaAvroSerializer
>>         serializer; private KafkaAvroDecoder decoder; public
>>         SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr,
>>         String schemaRegistryUrl) { this.tClass = tClass;
>>         this.tSchemaStr = tSchemaStr; this.topic = null;
>>         this.schemaRegistryUrl = schemaRegistryUrl; }
>>         @Override public T deserialize(ConsumerRecord<byte[], byte[]> 
>> record) throws Exception {
>>         CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(
>>                 schemaRegistryUrl,
>>                 4);
>>
>>         decoder = new KafkaAvroDecoder(client);
>>             GenericRecord generic = (GenericRecord) 
>> decoder.fromBytes(record.value());DatumWriter<GenericRecord> writer = new 
>> SpecificDatumWriter<>(generic.getSchema(), 
>> ManagedSpecificData.getForClass(tClass));
>>             ByteArrayOutputStream out = new ByteArrayOutputStream();
>>             Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
>>             writer.write(generic, encoder);
>>             encoder.flush();
>>
>>             byte[] avroData = out.toByteArray();
>>             out.close();
>>
>>         tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
>>             SpecificDatumReader<T> reader = new SpecificDatumReader<>(
>>                     generic.getSchema(), tSchema, 
>> ManagedSpecificData.getForClass(tClass));
>>             Decoder anotherDecoder = 
>> DecoderFactory.get().binaryDecoder(avroData, null);
>>             T res = reader.read(null, anotherDecoder);
>>
>>             return res;
>>         }
>>         }
>>
>>
>>         /*
>>         the specificData class
>>         */
>>         public class ManagedSpecificData extends SpecificData {  
>>         private static ManagedSpecificData getManagedSpecificData() {
>>             ManagedSpecificData res = new ManagedSpecificData();
>>
>>             registerAdvancedType(new TimestampMillisType(), res);
>>             registerAdvancedType(new LocalDateType(), res);
>>
>>             return res;
>>         }}
>>
>>         /*
>>         how we use above deser class
>>         */
>>         SpecificRecordSerDe<PayloadRecord> deserializer = new 
>> SpecificRecordSerDe<>(
>>                 PayloadRecord.class,
>>                 PayloadRecord.getClassSchema().toString(),
>>                 this.schemaRegistry);
>>
>>         FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
>>                 this.inputTopic,
>>                 deserializer,
>>                 this.sourceSettings);
>>
>>
>>         Thanks
>>         Lian
>>
>>
>>
>>         On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz
>>         <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:
>>
>>             Hi,
>>
>>             Could you share exactly how do you configure avro &
>>             kafka? Do you use
>>             Table API or DataStream API? Do you use the
>>             ConfluentRegistryDeserializationSchema that comes with
>>             Flink or did you
>>             built custom DeserializationSchema? Could you maybe share
>>             the code for
>>             instantiating the source with us? It could help us track
>>             down the
>>             problematic spot.
>>
>>             Best,
>>
>>             Dawid
>>
>>             On 16/09/2020 08:09, Lian Jiang wrote:
>>             > Hi,
>>             >
>>             > i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka
>>             5.5. In
>>             > Intellij, I can see the FlinkKafkaConsumer already
>>             deserialized the
>>             > upstream kafka message. However, I got below error when
>>             this message
>>             > is serialized during pushToOperator. Per the stack
>>             trace, the reason
>>             > is that AvroSerializer is created by
>>             AvroFactory.fromSpecific() which
>>             > creates its private copy of specificData. This private
>>             specificData
>>             > does not have logical type information. This blocks the
>>             deserialized
>>             > messages from being passed to downstream operators. Any
>>             idea how to
>>             > make this work? Appreciated very much!
>>             >
>>             >
>>             > org.apache.avro.AvroRuntimeException: Unknown datum type
>>             > java.time.Instant: 2020-09-15T07:00:00Z
>>             > at
>>             
>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
>>             > at
>>             >
>>             
>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>             > at
>>             
>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
>>             > at
>>             
>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>             > at
>>             
>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>             > at
>>             
>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>             > at
>>             
>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>             > at
>>             
>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
>>             > at
>>             
>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>             > at
>>             
>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
>>             > at
>>             
>> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
>>             > at
>>             >
>>             
>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>>             > at
>>             >
>>             
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>
>>
>>
>>         -- 
>>
>>
>>
>>         Create your own email signature
>>         
>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>
>
>
>
>     -- 
>
>
>
>     Create your own email signature
>     
> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>
>
>
>
> -- 
>
> Arvid Heise| Senior Java Developer
>
> <https://www.ververica.com/>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/>- The Apache
> FlinkConference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244
> BManaging Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
> Ji (Toni) Cheng   

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to