Hi Stephan, I went through one of the old mail thread http://mail-archives.apache.org/mod_mbox/flink-user/201510.mbox/%3CCANC1h_vq-TVjTNhXyYLoVso7GRGkdGWioM5Ppg%3DGoQPjvigqYg%40mail.gmail.com%3E
Here it is mentioned that When reading from Kafka you are expected to define a DeserializationSchema. There is no out of the box (de)serializer for Flink with Kafka, but it should be not very hard to add. I have some questions: 1. As per FLINK-3691 you are adding GenericDatumReader, so I suppose I need to use it instead of DatumReader in my DeserializationSchema which is required to read data from Kafka? 2. What is the recommended way to read AVRO binary data from Kafka if I have the AVRO schema file [*.avsc ] with me? Is there a better more efficient approach? 3. Can AvroInputFormat be used to read Kafka data or DeserializationSchema is a must to read data from Kafka, also AvroInputFormat doesn’t have any javaDoc with it. Thanks & Regards, Zeeshan Alam From: Stephan Ewen [mailto:se...@apache.org] Sent: Tuesday, August 02, 2016 7:52 PM To: user@flink.apache.org Subject: Re: What is the recommended way to read AVRO data from Kafka using flink. Hi! I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1 Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691 Here is the mail thread: http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E You could try and use the latest release candidate to get the fix:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html The release is also happening, so should be out in a stable release soon. Greetings, Stephan On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <zeeshan.a...@fmr.com<mailto:zeeshan.a...@fmr.com>> wrote: Hi, I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having the AVRO schema file with me which was used to write data in Kafka. Here https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html you have mentioned that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. So what is the recommended way to read AVRO data from Kafka using flink. public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092"); properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181"); properties.setProperty("group.id<http://group.id>", "Zeeshantest"); AvroDeserializationSchema<GenericData.Record> avroSchema = new AvroDeserializationSchema<>(GenericData.Record.class); FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties); DataStream<GenericData.Record> messageStream = env.addSource(kafkaConsumer); messageStream.rebalance().print(); env.execute("Flink AVRO KAFKA Test"); } This is the AvroDeserializationSchema that I am using. public class AvroDeserializationSchema<T> implements DeserializationSchema<T> { private static final long serialVersionUID = 4330538776656642778L; private final Class<T> avroType; private transient DatumReader<T> reader; private transient BinaryDecoder decoder; public AvroDeserializationSchema(Class<T> avroType) { this.avroType = avroType; } @Override public T deserialize(byte[] message) { ensureInitialized(); try { decoder = DecoderFactory.get().binaryDecoder(message, decoder); return reader.read(null, decoder); } catch (Exception e) { throw new RuntimeException(e); } } @Override public boolean isEndOfStream(T nextElement) { return false; } @Override public TypeInformation<T> getProducedType() { return TypeExtractor.getForClass(avroType); } private void ensureInitialized() { if (reader == null) { if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) { reader = new SpecificDatumReader<T>(avroType); } else { reader = new ReflectDatumReader<T>(avroType); } } } } On running this I am getting java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record. Thanks & Regards Zeeshan Alam