Thanks. I have already found the second solution in the mail archive. I also provided it as answer to this question. https://stackoverflow.com/a/55917157/1872639
Yohei Onishi On Tue, Apr 30, 2019 at 10:13 PM Alexey Romanenko <[email protected]> wrote: > Hi Yohei, > > In general, this code is correct but it requires additional casting and > extracting the result of Kafka.read() into local variable, like this: > > > > > *KafkaIO.Read read = KafkaIO.<Long, > MyClass>read().withKeyDeserializer(LongDeserializer.class) > .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, > AvroCoder.of(MyClass.class));p.apply(read);* > > The whole code example is here: > > https://github.com/aromanenko-dev/beam-issues/blob/master/io/src/main/java/org/apache/beam/issues/io/KafkaAvro.java > > Another option is to extend *KafkaAvroDeserializer *and override your own > *deserialize()* method which will return proper type (*MyClass*) instead > of default *Object*. > > > On 30 Apr 2019, at 08:18, Yohei Onishi <[email protected]> wrote: > > Hi, > > I am currently developing an application that consumes Avro format message > from Kafka. I am facing the following error. Can anybody help me fix it? > > My code looks like this. I am trying to deserialize Avro format message to > MyClass (POJO) using KafkaAvroDeserializer > > public class MyClassConsumer { >> public static void main(String[] args) { >> PipelineOptions options = PipelineOptionsFactory.create(); >> Pipeline p = Pipeline.create(options); >> PCollection<KV<Long, MyClass>> input = p.apply(KafkaIO.<Long, >> Object>read() >> .withKeyDeserializer(LongDeserializer.class) >> >> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, >> AvroCoder.of(MyClass.class)) >> ); >> p.run(); >> } >> } >> @DefaultCoder(AvroCoder.class) >> public class MyClass{ >> String name; >> String age; >> MyClass(){ >> } >> MyClass(String n, String a) { >> this.name= n; >> this.age= a; >> } >> } > > > but I got this error. > > Compilation failure >> [ERROR] >> /Users/01087872/Documents/fr-det-avro-sample/src/main/java/examples/MyClassConsumer.java:[19,17] >> incompatible types: inference variable T has incompatible equality >> constraints java.lang.Object,examples.MyClass > > > This is the same issue this stackdriver question mentioned. > https://stackoverflow.com/questions/54755668/how-to-deserialising-kafka-avro-messages-using-apache-beam#comment98480658_54756442 > > If you find a solution please let me know. > > Yohei Onishi > > >
