Hi Yohei, 

In general, this code is correct but it requires additional casting and 
extracting the result of Kafka.read() into local variable, like this: 

KafkaIO.Read read = KafkaIO.<Long, 
MyClass>read().withKeyDeserializer(LongDeserializer.class)
    .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
        AvroCoder.of(MyClass.class));
p.apply(read);

The whole code example is here: 
https://github.com/aromanenko-dev/beam-issues/blob/master/io/src/main/java/org/apache/beam/issues/io/KafkaAvro.java
 
<https://github.com/aromanenko-dev/beam-issues/blob/master/io/src/main/java/org/apache/beam/issues/io/KafkaAvro.java>

Another option is to extend KafkaAvroDeserializer and override your own 
deserialize() method which will return proper type (MyClass) instead of default 
Object.


> On 30 Apr 2019, at 08:18, Yohei Onishi <[email protected]> wrote:
> 
> Hi,
> 
> I am currently developing an application that consumes Avro format message 
> from Kafka. I am facing the following error. Can anybody help me fix it?  
> 
> My code looks like this. I am trying to deserialize Avro format message to 
> MyClass (POJO) using KafkaAvroDeserializer
> 
> public class MyClassConsumer {
>     public static void main(String[] args) {
>         PipelineOptions options = PipelineOptionsFactory.create();
>         Pipeline p = Pipeline.create(options);
>         PCollection<KV<Long, MyClass>> input = p.apply(KafkaIO.<Long, 
> Object>read()
>                 .withKeyDeserializer(LongDeserializer.class)
>                 .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, 
> AvroCoder.of(MyClass.class))
>         );
>         p.run();
>     }
> }
> @DefaultCoder(AvroCoder.class)
> public class MyClass{
>     String name;
>     String age;
>     MyClass(){
>     }
>     MyClass(String n, String a) {
>         this.name <http://this.name/>= n;
>         this.age= a;
>     }
> }
> 
> but I got this error.
> 
> Compilation failure
> [ERROR] 
> /Users/01087872/Documents/fr-det-avro-sample/src/main/java/examples/MyClassConsumer.java:[19,17]
>  incompatible types: inference variable T has incompatible equality 
> constraints java.lang.Object,examples.MyClass
> 
> This is the same issue this stackdriver question mentioned. 
> https://stackoverflow.com/questions/54755668/how-to-deserialising-kafka-avro-messages-using-apache-beam#comment98480658_54756442
>  
> <https://stackoverflow.com/questions/54755668/how-to-deserialising-kafka-avro-messages-using-apache-beam#comment98480658_54756442>
>  
> If you find a solution please let me know.
> 
> Yohei Onishi

Reply via email to