Not quite sure, will try to find out today.

Thanks,
Dayong

> On Nov 2, 2016, at 9:59 PM, "Tauzell, Dave" <dave.tauz...@surescripts.com> 
> wrote:
> 
> Is Kafka connect adding some bytes to the beginning of the avro with the 
> scheme registry id?
> 
> Dave
> 
>> On Nov 2, 2016, at 18:43, Will Du <will...@gmail.com> wrote:
>> 
>> By using the kafka-avro-console-consumer I am able to get rich message from 
>> kafka connect with AvroConvert, but it got no output except schema from Flink
>> 
>> By using the producer with defaultEncoding, the kafka-avro-console-consumer 
>> throws exceptions show how. But Flink consumer works. But my target is to 
>> get Flink costume avro data produced by Kafka connect
>> 
>>> On Nov 2, 2016, at 7:36 PM, Will Du <will...@gmail.com> wrote:
>>> 
>>> 
>>> On Nov 2, 2016, at 7:31 PM, Will Du <will...@gmail.com 
>>> <mailto:will...@gmail.com>> wrote:
>>> 
>>> Hi folks,
>>> I am trying to consume avro data from Kafka in Flink. The data is produced 
>>> by Kafka connect using AvroConverter. I have created a 
>>> AvroDeserializationSchema.java 
>>> <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used 
>>> by Flink consumer. Then, I use following code to read it.
>>> 
>>> public static void main(String[] args) throws Exception {
>>>             StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>             Properties properties = new Properties();
>>>             properties.setProperty("bootstrap.servers", “localhost:9092");
>>>             properties.setProperty("zookeeper.connect", “localhost:2181”);
>>> Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
>>>                                      + "\"type\": \"record\", "
>>>                                      + "\"fields\": "
>>>                                      +" [ "
>>>                                      + "  { \"name\": \"name\", \"type\": 
>>> \"string\" },"
>>>                                      + "  { \"name\": \"symbol\", \"type\": 
>>> \"string\" },"
>>>                                      + "  { \"name\": \"exchange\", 
>>> \"type\": \"string\"}"
>>>                                      + "] "
>>>                                      +"}");
>>> 
>>>             AvroDeserializationSchema avroSchema = new 
>>> AvroDeserializationSchema<>(schema);
>>>             FlinkKafkaConsumer09<GenericRecord> kafkaConsumer =
>>>                 new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, 
>>> properties);
>>>             DataStream<GenericRecord> messageStream = 
>>> env.addSource(kafkaConsumer);
>>>             messageStream.rebalance().print();
>>>             env.execute("Flink AVRO KAFKA Test");
>>> }
>>> 
>>> Once, I run the code, I am able to get the schema information only as 
>>> follows.
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":""}
>>> {"name":"", "symbol":"", "exchange":”"}
>>> 
>>> Could anyone help to find out the issues why I cannot decode it?
>>> 
>>> Further troubleshooting, I found out if I use a kafka producer here 
>>> <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to 
>>> send the avro data especially using kafka.serializer.DefaultEncoder. Above 
>>> code can get correct result. Does any body know how to either set 
>>> DefaultEncoder in Kafka Connect or set it when writing customized kafka 
>>> connect? Or in the other way, how should I modify the 
>>> AvroDeserializationSchema.java for instead?
>>> 
>>> Thanks, I’ll post this to the Flink user group as well.
>>> Will
>> 
> This e-mail and any files transmitted with it are confidential, may contain 
> sensitive information, and are intended solely for the use of the individual 
> or entity to whom they are addressed. If you have received this e-mail in 
> error, please notify the sender by reply e-mail immediately and destroy all 
> copies of the e-mail and any attachments.

Reply via email to