Hi Stephan,

My AvroDeserializationSchema worked fine with a different Kafka topic, it seems 
like the previous Kafka topic was having heterogeneous data with both AVRO and 
JSON formatted data. Thanks for your time ☺.

Thanks & Regards
Zeeshan Alam

From: Stephan Ewen [mailto:se...@apache.org]
Sent: Thursday, August 04, 2016 6:00 PM
To: user@flink.apache.org
Subject: Re: What is the recommended way to read AVRO data from Kafka using 


To read data from Kafka, you need a DeserializationSchema. You could create one 
that wraps the AvroInputFormat, but an AvroDeserializationSchema would simply 
be an adjustment of the AvroInputFormat to the interface of the 

In your Avro DeserializationSchema, you can probably create the Avro readers 
internally with an Avro schema (I believe).


On Tue, Aug 2, 2016 at 4:53 PM, Alam, Zeeshan 
<zeeshan.a...@fmr.com<mailto:zeeshan.a...@fmr.com>> wrote:
Hi Stephan,

I went through one of the old mail thread 

Here it is mentioned that  When reading from Kafka you are expected to define a 
DeserializationSchema. There is no out of the box (de)serializer for Flink with 
Kafka, but it should be not very hard to add.

I have some questions:

1.       As per FLINK-3691  you are adding GenericDatumReader, so I suppose I 
need to use it instead of DatumReader in my  DeserializationSchema which is 
required to read data from Kafka?

2.  What is the recommended way to read AVRO binary data from Kafka if I  have 
the AVRO schema file [*.avsc ] with me? Is there a better more efficient 

3.       Can AvroInputFormat be used to read Kafka data or 
DeserializationSchema is a must to read data from Kafka, also AvroInputFormat 
doesn’t have any javaDoc with it.

Thanks & Regards,
Zeeshan Alam

From: Stephan Ewen [mailto:se...@apache.org<mailto:se...@apache.org>]
Sent: Tuesday, August 02, 2016 7:52 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: What is the recommended way to read AVRO data from Kafka using 


I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1

Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691

Here is the mail thread:

You could try and use the latest release candidate to get the 

The release is also happening, so should be out in a stable release soon.


On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan 
<zeeshan.a...@fmr.com<mailto:zeeshan.a...@fmr.com>> wrote:

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I 
am having the AVRO schema file with me which was used to write data in Kafka. 
 you have mentioned that using the GenericData.Record type is possible with 
Flink, but not recommended. Since the record contains the full schema, its very 
data intensive and thus probably slow to use. So what is the recommended way to 
read AVRO data from Kafka using flink.

public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = 
              Properties properties = new Properties();
              AvroDeserializationSchema<GenericData.Record> avroSchema = new 
              FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new 
FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
              DataStream<GenericData.Record> messageStream = 
              env.execute("Flink AVRO KAFKA Test");

This is the AvroDeserializationSchema that I am using.

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

       private static final long serialVersionUID = 4330538776656642778L;

       private final Class<T> avroType;
       private transient DatumReader<T> reader;
       private transient BinaryDecoder decoder;

       public AvroDeserializationSchema(Class<T> avroType) {
              this.avroType = avroType;

       public T deserialize(byte[] message) {
              try {
                     decoder = DecoderFactory.get().binaryDecoder(message, 
                     return reader.read(null, decoder);
              } catch (Exception e) {
                     throw new RuntimeException(e);

       public boolean isEndOfStream(T nextElement) {
              return false;

       public TypeInformation<T> getProducedType() {
              return TypeExtractor.getForClass(avroType);

       private void ensureInitialized() {
              if (reader == null) {
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                           reader = new SpecificDatumReader<T>(avroType);
                     } else {
                           reader = new ReflectDatumReader<T>(avroType);

On running this I am getting java.lang.Exception: Not a Specific class: class 

Thanks & Regards
Zeeshan Alam

Reply via email to