Hi Stephan,

I went through one of the old mail thread 
http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E


Here it is mentioned that  When reading from Kafka you are expected to define a 
DeserializationSchema. There is no out of the box (de)serializer for Flink with 
Kafka, but it should be not very hard to add.



I have some questions:



1.       As per FLINK-3691  you are adding GenericDatumReader, so I suppose I 
need to use it instead of DatumReader in my  DeserializationSchema which is 
required to read data from Kafka?



2.  What is the recommended way to read AVRO binary data from Kafka if I  have 
the AVRO schema file [*.avsc ] with me? Is there a better more efficient 
approach?



3.       Can AvroInputFormat be used to read Kafka data or 
DeserializationSchema is a must to read data from Kafka, also AvroInputFormat 
doesn’t have any javaDoc with it.





Thanks & Regards,
Zeeshan Alam




From: Stephan Ewen [mailto:se...@apache.org]
Sent: Tuesday, August 02, 2016 7:52 PM
To: user@flink.apache.org
Subject: Re: What is the recommended way to read AVRO data from Kafka using 
flink.

Hi!

I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1

Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691

Here is the mail thread:
http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E

You could try and use the latest release candidate to get the 
fix:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html

The release is also happening, so should be out in a stable release soon.

Greetings,
Stephan


On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan 
<zeeshan.a...@fmr.com<mailto:zeeshan.a...@fmr.com>> wrote:
Hi,

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I 
am having the AVRO schema file with me which was used to write data in Kafka. 
Here 
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
 you have mentioned that using the GenericData.Record type is possible with 
Flink, but not recommended. Since the record contains the full schema, its very 
data intensive and thus probably slow to use. So what is the recommended way to 
read AVRO data from Kafka using flink.

public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
              Properties properties = new Properties();
              properties.setProperty("bootstrap.servers", 
"dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
              properties.setProperty("zookeeper.connect", 
"dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
              properties.setProperty("group.id<http://group.id>", 
"Zeeshantest");
              AvroDeserializationSchema<GenericData.Record> avroSchema = new 
AvroDeserializationSchema<>(GenericData.Record.class);
              FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new 
FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
              DataStream<GenericData.Record> messageStream = 
env.addSource(kafkaConsumer);
              messageStream.rebalance().print();
              env.execute("Flink AVRO KAFKA Test");
       }

This is the AvroDeserializationSchema that I am using.


public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

       private static final long serialVersionUID = 4330538776656642778L;

       private final Class<T> avroType;
       private transient DatumReader<T> reader;
       private transient BinaryDecoder decoder;

       public AvroDeserializationSchema(Class<T> avroType) {
              this.avroType = avroType;
       }

       @Override
       public T deserialize(byte[] message) {
              ensureInitialized();
              try {
                     decoder = DecoderFactory.get().binaryDecoder(message, 
decoder);
                     return reader.read(null, decoder);
              } catch (Exception e) {
                     throw new RuntimeException(e);
              }
       }

       @Override
       public boolean isEndOfStream(T nextElement) {
              return false;
       }

       @Override
       public TypeInformation<T> getProducedType() {
              return TypeExtractor.getForClass(avroType);
       }

       private void ensureInitialized() {
              if (reader == null) {
                     if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                           reader = new SpecificDatumReader<T>(avroType);
                     } else {
                           reader = new ReflectDatumReader<T>(avroType);
                     }
              }
       }
}

On running this I am getting java.lang.Exception: Not a Specific class: class 
org.apache.avro.generic.GenericData$Record.

Thanks & Regards
Zeeshan Alam



Reply via email to