Thanks.
I have already found the second solution in the mail archive.
I also provided it as answer to this question.
https://stackoverflow.com/a/55917157/1872639

Yohei Onishi


On Tue, Apr 30, 2019 at 10:13 PM Alexey Romanenko <[email protected]>
wrote:

> Hi Yohei,
>
> In general, this code is correct but it requires additional casting and
> extracting the result of Kafka.read() into local variable, like this:
>
>
>
>
> *KafkaIO.Read read = KafkaIO.<Long,
> MyClass>read().withKeyDeserializer(LongDeserializer.class)
> .withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
> AvroCoder.of(MyClass.class));p.apply(read);*
>
> The whole code example is here:
>
> https://github.com/aromanenko-dev/beam-issues/blob/master/io/src/main/java/org/apache/beam/issues/io/KafkaAvro.java
>
> Another option is to extend *KafkaAvroDeserializer *and override your own
> *deserialize()* method which will return proper type (*MyClass*) instead
> of default *Object*.
>
>
> On 30 Apr 2019, at 08:18, Yohei Onishi <[email protected]> wrote:
>
> Hi,
>
> I am currently developing an application that consumes Avro format message
> from Kafka. I am facing the following error. Can anybody help me fix it?
>
> My code looks like this. I am trying to deserialize Avro format message to
> MyClass (POJO) using KafkaAvroDeserializer
>
> public class MyClassConsumer {
>>     public static void main(String[] args) {
>>         PipelineOptions options = PipelineOptionsFactory.create();
>>         Pipeline p = Pipeline.create(options);
>>         PCollection<KV<Long, MyClass>> input = p.apply(KafkaIO.<Long,
>> Object>read()
>>                 .withKeyDeserializer(LongDeserializer.class)
>>
>> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>> AvroCoder.of(MyClass.class))
>>         );
>>         p.run();
>>     }
>> }
>> @DefaultCoder(AvroCoder.class)
>> public class MyClass{
>>     String name;
>>     String age;
>>     MyClass(){
>>     }
>>     MyClass(String n, String a) {
>>         this.name= n;
>>         this.age= a;
>>     }
>> }
>
>
> but I got this error.
>
> Compilation failure
>> [ERROR]
>> /Users/01087872/Documents/fr-det-avro-sample/src/main/java/examples/MyClassConsumer.java:[19,17]
>> incompatible types: inference variable T has incompatible equality
>> constraints java.lang.Object,examples.MyClass
>
>
> This is the same issue this stackdriver question mentioned.
> https://stackoverflow.com/questions/54755668/how-to-deserialising-kafka-avro-messages-using-apache-beam#comment98480658_54756442
>
> If you find a solution please let me know.
>
> Yohei Onishi
>
>
>

Reply via email to