Not quite sure, will try to find out today. Thanks, Dayong
> On Nov 2, 2016, at 9:59 PM, "Tauzell, Dave" <dave.tauz...@surescripts.com> > wrote: > > Is Kafka connect adding some bytes to the beginning of the avro with the > scheme registry id? > > Dave > >> On Nov 2, 2016, at 18:43, Will Du <will...@gmail.com> wrote: >> >> By using the kafka-avro-console-consumer I am able to get rich message from >> kafka connect with AvroConvert, but it got no output except schema from Flink >> >> By using the producer with defaultEncoding, the kafka-avro-console-consumer >> throws exceptions show how. But Flink consumer works. But my target is to >> get Flink costume avro data produced by Kafka connect >> >>> On Nov 2, 2016, at 7:36 PM, Will Du <will...@gmail.com> wrote: >>> >>> >>> On Nov 2, 2016, at 7:31 PM, Will Du <will...@gmail.com >>> <mailto:will...@gmail.com>> wrote: >>> >>> Hi folks, >>> I am trying to consume avro data from Kafka in Flink. The data is produced >>> by Kafka connect using AvroConverter. I have created a >>> AvroDeserializationSchema.java >>> <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used >>> by Flink consumer. Then, I use following code to read it. >>> >>> public static void main(String[] args) throws Exception { >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> Properties properties = new Properties(); >>> properties.setProperty("bootstrap.servers", “localhost:9092"); >>> properties.setProperty("zookeeper.connect", “localhost:2181”); >>> Schema schema = new Parser().parse("{" + "\"name\": \"test\", " >>> + "\"type\": \"record\", " >>> + "\"fields\": " >>> +" [ " >>> + " { \"name\": \"name\", \"type\": >>> \"string\" }," >>> + " { \"name\": \"symbol\", \"type\": >>> \"string\" }," >>> + " { \"name\": \"exchange\", >>> \"type\": \"string\"}" >>> + "] " >>> +"}"); >>> >>> AvroDeserializationSchema avroSchema = new >>> AvroDeserializationSchema<>(schema); >>> FlinkKafkaConsumer09<GenericRecord> kafkaConsumer = >>> new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, >>> properties); >>> DataStream<GenericRecord> messageStream = >>> env.addSource(kafkaConsumer); >>> messageStream.rebalance().print(); >>> env.execute("Flink AVRO KAFKA Test"); >>> } >>> >>> Once, I run the code, I am able to get the schema information only as >>> follows. >>> {"name":"", "symbol":"", "exchange":""} >>> {"name":"", "symbol":"", "exchange":""} >>> {"name":"", "symbol":"", "exchange":""} >>> {"name":"", "symbol":"", "exchange":”"} >>> >>> Could anyone help to find out the issues why I cannot decode it? >>> >>> Further troubleshooting, I found out if I use a kafka producer here >>> <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to >>> send the avro data especially using kafka.serializer.DefaultEncoder. Above >>> code can get correct result. Does any body know how to either set >>> DefaultEncoder in Kafka Connect or set it when writing customized kafka >>> connect? Or in the other way, how should I modify the >>> AvroDeserializationSchema.java for instead? >>> >>> Thanks, I’ll post this to the Flink user group as well. >>> Will >> > This e-mail and any files transmitted with it are confidential, may contain > sensitive information, and are intended solely for the use of the individual > or entity to whom they are addressed. If you have received this e-mail in > error, please notify the sender by reply e-mail immediately and destroy all > copies of the e-mail and any attachments.